/****************************************************************
 *								*
 *	Copyright 2005, 2008 Fidelity Information Services, Inc	*
 *								*
 *	This source code contains the intellectual property	*
 *	of its copyright holder(s), and is made available	*
 *	under a license.  If you do not know the terms of	*
 *	the license, please stop and do not read further.	*
 *								*
 ****************************************************************/

/****************************************************************
        dbcertify_certify_phase2.c - Database certification phase 2

        - Verify phase 1 input file.
        - Locate and open database after getting standalong access.
        - Read the identified blocks in and if they are still too
          large, split them.
        - Certify the database as "clean" if no errors encountered.

        Note: Most routines in this utility are self-contained
              meaning they do not reference GT.M library routines
              (with some notable exceptions). This is because
              phase-2 is going to run against V4 format databases
              but any linked routines would be compiled for V5
              databases.
****************************************************************/

#include "mdef.h"

#ifdef VMS
#include <descrip.h>
#include <rms.h>
#include <ssdef.h>
#endif

#include <errno.h>
#include "gtm_stat.h"
#include "gtm_ctype.h"
#include "gtm_stdio.h"
#include "gtm_string.h"
#include "gtm_unistd.h"
#include "gtm_stdlib.h"
#include "gtm_fcntl.h"

#include "gtmio.h"
#include "cli.h"
#include "copy.h"
#include "iosp.h"
#include "gdsroot.h"
#include "v15_gdsroot.h"
#include "gtm_facility.h"
#include "fileinfo.h"
#include "gdsbt.h"
#include "v15_gdsbt.h"
#include "gdsfhead.h"
#include "v15_gdsfhead.h"
#include "filestruct.h"
#include "v15_filestruct.h"
#include "gdsblk.h"
#include "gdsbml.h"
#include "gdscc.h"
#include "bmm_find_free.h"
#include "gdsblkops.h"
#include "bit_set.h"
#include "bit_clear.h"
#include "min_max.h"
#include "gtmmsg.h"
#ifdef VMS
#  include "is_file_identical.h"
#endif
#include "error.h"
#include "mupip_exit.h"
#include "util.h"
#include "dbcertify.h"

GBLREF	char_ptr_t		update_array, update_array_ptr;
GBLREF	uint4			update_array_size;
GBLREF	VSIG_ATOMIC_T		forced_exit;			/* Signal came in while we were in critical section */
GBLREF	int4			exi_condition;
GBLREF	phase_static_area	*psa_gbl;

boolean_t dbc_split_blk(phase_static_area *psa, block_id blk_num, enum gdsblk_type blk_type, v15_trans_num tn, int blk_levl);
void dbc_flush_fhead(phase_static_area *psa);
void dbc_read_p1out(phase_static_area *psa, void *obuf, int olen);

error_def(ERR_DEVOPENFAIL);
error_def(ERR_FILENOTFND);
error_def(ERR_DBCSCNNOTCMPLT);
error_def(ERR_DBCBADFILE);
error_def(ERR_DBCMODBLK2BIG);
error_def(ERR_DBCINTEGERR);
error_def(ERR_DBCNOEXTND);
error_def(ERR_DBCDBCERTIFIED);
error_def(ERR_DBCNOTSAMEDB);
error_def(ERR_DBCDBNOCERTIFY);
error_def(ERR_DBCREC2BIGINBLK);
error_def(ERR_SYSCALL);
error_def(ERR_TEXT);
error_def(ERR_BITMAPSBAD);
error_def(ERR_MUPCLIERR);

/* The final certify phase of certification process */
void dbcertify_certify_phase(void)
{
	int		save_errno, len, rc, restart_cnt, maxkeystructsize;
	uint4		rec_num;
	char_ptr_t	errmsg;
	boolean_t	restart_transaction, p1rec_read;
	unsigned short	buff_len;
	char		ans[2];
	unsigned char	dbfn[MAX_FN_LEN + 1];
	file_control	*fc;
	phase_static_area *psa;

	psa = psa_gbl;
	DBC_DEBUG(("DBC_DEBUG: Beginning certification phase\n"));
	psa->phase_one = FALSE;
	UNIX_ONLY(atexit(dbc_certify_phase_cleanup));
	psa->block_depth = psa->block_depth_hwm = -1;		/* Initialize no cache */

	/* Check parsing results */
	if (CLI_PRESENT == cli_present("BLOCKS"))
	{
		if (!cli_get_hex("BLOCKS", &psa->blocks_to_process))
			exit(1);		/* Error message already raised */
	} else
		psa->blocks_to_process = MAXTOTALBLKS_V4;
	if (CLI_PRESENT == cli_present("TEMPFILE_DIR"))
	{	/* Want to put temp files in this directory */
		buff_len = sizeof(psa->tmpfiledir) - 1;
		if (0 == cli_get_str("TEMPFILE_DIR", (char_ptr_t)psa->tmpfiledir, &buff_len))
			mupip_exit(ERR_MUPCLIERR);
	}
	psa->keep_temp_files = (CLI_PRESENT == cli_present("KEEP_TEMPS"));
	buff_len = sizeof(psa->outfn) - 1;
	if (0 == cli_get_str("P1OUTFILE", (char_ptr_t)psa->outfn, &buff_len))
		mupip_exit(ERR_MUPCLIERR);

	/* Open phase-1 output file (our input file) */
	psa->outfd = OPEN((char_ptr_t)psa->outfn, O_RDONLY RMS_OPEN_BIN);
	if (-1 == psa->outfd)
	{
		save_errno = errno;
		if (save_errno == ENOENT)
			rts_error(VARLSTCNT(4) ERR_FILENOTFND, 2, RTS_ERROR_STRING((char_ptr_t)psa->outfn));
		else
		{
			errmsg = STRERROR(save_errno);
			rts_error(VARLSTCNT(8) ERR_DEVOPENFAIL, 2, RTS_ERROR_STRING((char_ptr_t)psa->outfn),
				  ERR_TEXT, 2, RTS_ERROR_STRING(errmsg));
		}
	}
	dbc_read_p1out(psa, &psa->ofhdr, sizeof(p1hdr));		/* Read phase 1 output file header */
	if (0 != memcmp(psa->ofhdr.p1hdr_tag, P1HDR_TAG, sizeof(psa->ofhdr.p1hdr_tag)))
		rts_error(VARLSTCNT(4) ERR_DBCBADFILE, 2, RTS_ERROR_STRING((char_ptr_t)psa->outfn));
	if (0 == psa->ofhdr.tot_blocks)
		/* Sanity check that the output file was finished and completed */
		rts_error(VARLSTCNT(4) ERR_DBCSCNNOTCMPLT, 2, RTS_ERROR_STRING((char_ptr_t)psa->outfn));
	assert(0 != psa->ofhdr.tn);

	/* Check if region name still associates to the same file */
	dbc_find_database_filename(psa, psa->ofhdr.regname, dbfn);

	/* Notify user this is a critical change and give them the opportunity to abort */
	util_out_print("--------------------------------------------------------------------------------", FLUSH);
	util_out_print("You must have a backup of database !AD before you proceed!!", FLUSH,
		       RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn));
	util_out_print("An abnormal termination can damage the database while doing the certification !!", FLUSH);
	util_out_print("Proceeding will also turn off replication and/or journaling if enabled", FLUSH);
	util_out_print("--------------------------------------------------------------------------------", FLUSH);
	util_out_print("Proceed? [y/n]:", FLUSH);
	SCANF("%1s", ans);	/* We only need one char, any more would overflow our buffer */
	if ('y' != ans[0] && 'Y' != ans[0])
	{
		util_out_print("Certification phase aborted\n", FLUSH);
		return;
	}
	util_out_print("Certification phase for database !AD beginning", FLUSH, RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn));

	/* Build database structures */
	psa->dbc_gv_cur_region = malloc(sizeof(gd_region));
	memset(psa->dbc_gv_cur_region, 0, sizeof(gd_region));

	psa->dbc_gv_cur_region->dyn.addr = malloc(sizeof(gd_segment));
	memset(psa->dbc_gv_cur_region->dyn.addr, 0, sizeof(gd_segment));
	psa->dbc_gv_cur_region->dyn.addr->acc_meth = dba_bg;
	len = STRLEN((char_ptr_t)psa->ofhdr.dbfn);
	strcpy((char_ptr_t)psa->dbc_gv_cur_region->dyn.addr->fname, (char_ptr_t)dbfn);
	psa->dbc_gv_cur_region->dyn.addr->fname_len = len;

	psa->dbc_gv_cur_region->dyn.addr->file_cntl = malloc(sizeof(*psa->dbc_gv_cur_region->dyn.addr->file_cntl));
	memset(psa->dbc_gv_cur_region->dyn.addr->file_cntl, 0, sizeof(*psa->dbc_gv_cur_region->dyn.addr->file_cntl));
	psa->dbc_gv_cur_region->dyn.addr->file_cntl->file_type = dba_bg;

	psa->dbc_gv_cur_region->dyn.addr->file_cntl->file_info = malloc(sizeof(GDS_INFO));
	memset(psa->dbc_gv_cur_region->dyn.addr->file_cntl->file_info, 0, sizeof(GDS_INFO));

	psa->dbc_cs_data = malloc(sizeof(*psa->dbc_cs_data));
	fc = psa->fc = psa->dbc_gv_cur_region->dyn.addr->file_cntl;
	fc->file_type = psa->dbc_gv_cur_region->dyn.addr->acc_meth = dba_bg;	/* Always treat as BG mode */

	/* Initialize for db processing - open and read in file-header, get "real" filename for comparison */
	dbc_init_db(psa);
	if (0 != strcmp((char_ptr_t)psa->dbc_gv_cur_region->dyn.addr->fname, (char_ptr_t)psa->ofhdr.dbfn))
		/* File name change means db was moved or at least is not as it was when it was scanned */
		rts_error(VARLSTCNT(1) ERR_DBCNOTSAMEDB);
	if (psa->ofhdr.tn > psa->dbc_cs_data->trans_hist.curr_tn)
		rts_error(VARLSTCNT(1) ERR_DBCNOTSAMEDB);
	psa->max_blk_len = psa->dbc_cs_data->blk_size - psa->dbc_cs_data->reserved_bytes;

	/* Initialize maximum key we may need later if we encounter gvtroot blocks */
	maxkeystructsize = sizeof(dbc_gv_key) + MAX_DBC_KEY_SZ - 1;
	psa->max_key = malloc(maxkeystructsize);
	memset(psa->max_key, 0, maxkeystructsize);
	psa->max_key->top = maxkeystructsize;
	psa->max_key->gvn_len = 1;
	*psa->max_key->base = (unsigned char)0xFF;
	/* Key format: 0xFF, 0x00, 0x00 : This is higher than any valid key would be */
	psa->max_key->end = MAX_DBC_KEY_SZ - 1;

	/* Allocate update array based on fileheader values */
	psa->dbc_cs_data->max_update_array_size = psa->dbc_cs_data->max_non_bm_update_array_size =
		(uint4)ROUND_UP2(MAX_NON_BITMAP_UPDATE_ARRAY_SIZE(psa->dbc_cs_data), UPDATE_ARRAY_ALIGN_SIZE);
	psa->dbc_cs_data->max_update_array_size += ROUND_UP2(MAX_BITMAP_UPDATE_ARRAY_SIZE, UPDATE_ARRAY_ALIGN_SIZE);
	update_array = malloc(psa->dbc_cs_data->max_update_array_size);
	update_array_size = psa->dbc_cs_data->max_update_array_size;

	/* Now to the real work -- Read and split each block the phase 1 file recorded that still needs
	   to be split (concurrent updates may have "fixed" some blocks).
	*/
	psa->hint_blk = psa->hint_lcl = 1;
	restart_transaction = p1rec_read = FALSE;
	restart_cnt = 0;
	for (rec_num = 0; rec_num < psa->ofhdr.blk_count || 0 < psa->gvtroot_rchildren_cnt;)
	{	/* There is the possibility that we are restarting the processing of a given record. In
		   that case we will not read the next record in but process what is already in the buffer.
		   This can occur if we have extended the database. */
		if (!restart_transaction)
		{	/* First to check is if we have any queued gvtroot_rchildren to process (described elsewhere). If we have
			   these, we process them now without bumping the record count.
			*/
			p1rec_read = FALSE;	/* Assume we did NOT read from the file */
			if (0 < psa->gvtroot_rchildren_cnt)
			{
				psa->gvtroot_rchildren_cnt--;
				memcpy((char *)&psa->rhdr, (char *)&psa->gvtroot_rchildren[psa->gvtroot_rchildren_cnt],
				       sizeof(p1rec));
				psa->gvtrchildren++;	/* counter */
				DBC_DEBUG(("DBC_DEBUG: Pulling p1rec from queued gvtroot_rchildren array (%d)\n",
					   psa->gvtroot_rchildren_cnt));
			} else
			{	/* Normal processing - read record from phase one file */
				if (rec_num == psa->blocks_to_process)
				{	/* Maximum records processed */
					DBC_DEBUG(("DBC_DEBUG: Maximum records to process limit reached "
						   "- premature exit to main loop\n"));
					break;
				}
				DBC_DEBUG(("DBC_DEBUG: ****************** Reading new p1out record (%d) *****************\n", \
					   (rec_num + 1)));
				dbc_read_p1out(psa, &psa->rhdr, sizeof(p1rec));
				if (0 != psa->rhdr.akey_len)
				{	/* This module does not need the ascii key so just bypass it if it exists */
					if (0 != psa->rhdr.blk_levl || sizeof(psa->rslt_buff) < psa->rhdr.akey_len )
						GTMASSERT;		/* Must be corrupted file? */
					dbc_read_p1out(psa, (char_ptr_t)psa->rslt_buff, psa->rhdr.akey_len);
				}
				p1rec_read = TRUE;	/* Note, not reset by restarted transaction */
			}
			/* Don't want to reset the high water mark on a restarted transaction */
			if (psa->block_depth > psa->block_depth_hwm)
				psa->block_depth_hwm = psa->block_depth;	/* Keep track of maximum indexes we have used */
			restart_cnt = 0;
		} else
		{
			++restart_cnt;
			if (MAX_RESTART_CNT < restart_cnt)
				GTMASSERT;			/* No idea what could cause this.. */
			DBC_DEBUG(("DBC_DEBUG: ****************** Restarted transaction (%d) *****************\n", \
				   (rec_num + 1)));
			/* "restart_transaction" is either set or cleared by dbc_split_blk() below */
		}
		assert((int)psa->rhdr.blk_type);
		/* Note assignment in "if" below */
		if (restart_transaction = dbc_split_blk(psa, psa->rhdr.blk_num, psa->rhdr.blk_type,
							psa->rhdr.tn, psa->rhdr.blk_levl))
			psa->block_depth_hwm = -1;	/* Zaps cache so all blocks are re-read */
		else if (p1rec_read)			/* If rec processed was from scan phase, bump record counter */
			rec_num++;
	} /* for each record in phase-1 output file or each restart or each queued rh child */

	/* Reaching this point, the database has been updated, with no errors. We can now certify
	   this database as ready for the current version of GT.M
	*/
	util_out_print("", FLUSH);	/* New line for below message in case MUPIP extension leaves prompt */
	if (0 == psa->blk_process_errors)
	{
		if (psa->blocks_to_process != rec_num)
		{
			((sgmnt_data_ptr_t)psa->dbc_cs_data)->certified_for_upgrade_to = GDSV5;
			psa->dbc_fhdr_dirty = TRUE;
			gtm_putmsg(VARLSTCNT(6) ERR_DBCDBCERTIFIED, 4, RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn),
				   RTS_ERROR_LITERAL("GT.M V5"));
		} else
		{
			DBC_DEBUG(("DBC_DEBUG: Database certification bypassed due to records to process limit being reached\n"));
		}
	} else
		gtm_putmsg(VARLSTCNT(4) ERR_DBCDBNOCERTIFY, 2, RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn));

	dbc_flush_fhead(psa);
	dbc_close_db(psa);
	CLOSEFILE(psa->outfd, rc);

	PRINTF("\n");
	PRINTF("Total blocks in scan phase file --   %12d [0x%08x]\n", psa->ofhdr.blk_count, psa->ofhdr.blk_count);
	PRINTF("Blocks bypassed ------------------   %12d [0x%08x]\n", psa->blks_bypassed, psa->blks_bypassed);
	PRINTF("Blocks processed -----------------   %12d [0x%08x]\n", psa->blks_processed, psa->blks_processed);
	PRINTF("Blocks read ----------------------   %12d [0x%08x]\n", psa->blks_read, psa->blks_read);
	PRINTF("Blocks read from cache -----------   %12d [0x%08x]\n", psa->blks_cached, psa->blks_cached);
	PRINTF("Blocks updated -------------------   %12d [0x%08x]\n", psa->blks_updated, psa->blks_updated);
	PRINTF("Blocks created -------------------   %12d [0x%08x]\n", psa->blks_created, psa->blks_created);
	PRINTF("GVTROOT right children processed -   %12d [0x%08x]\n", psa->gvtrchildren, psa->gvtrchildren);

	/* Release resources */
	free(update_array);
	free(psa->dbc_cs_data);
#ifdef VMS
	/* Some extra freeing of control blocks on VMS */
	if (NULL != FILE_INFO(psa->dbc_gv_cur_region)->fab)
		free(FILE_INFO(psa->dbc_gv_cur_region)->fab);
	if (NULL != FILE_INFO(psa->dbc_gv_cur_region)->nam)
	{
		if (NULL != FILE_INFO(psa->dbc_gv_cur_region)->nam->nam$l_esa)
			free(FILE_INFO(psa->dbc_gv_cur_region)->nam->nam$l_esa);
		free(FILE_INFO(psa->dbc_gv_cur_region)->nam);
	}
	if (NULL != FILE_INFO(psa->dbc_gv_cur_region)->xabfhc)
		free(FILE_INFO(psa->dbc_gv_cur_region)->xabfhc);
	if (NULL != FILE_INFO(psa->dbc_gv_cur_region)->xabpro)
		free(FILE_INFO(psa->dbc_gv_cur_region)->xabpro);
#endif
	free(psa->dbc_gv_cur_region->dyn.addr->file_cntl->file_info);
	free(psa->dbc_gv_cur_region->dyn.addr->file_cntl);
	free(psa->dbc_gv_cur_region->dyn.addr);
	free(psa->dbc_gv_cur_region);
	psa->dbc_gv_cur_region = NULL;
	if (psa->first_rec_key)
		free(psa->first_rec_key);
	free(psa);
}

/* Routine to handle the processing (splitting) of a given database block. If the current block process needs
   to be restarted, this function returns TRUE. else if processing completed normally, returns FALSE.

   Routine notes:

   This routine implements a "simplistic" mini database engine. It is "simplistic" in the regards to fact that it
   doesn't need to worry about concurrency issues. It also has one design assumption that we will NEVER add a
   record to a level 0 block (either DT or GVT). Because of this assumption, many complications from gvcst_put(), on
   which it is largely based, were non-issues and were removed (e.g. no TP). This routine has its own concepts of
   "cache", cw_set elements, update arrays, etc. Following is a brief description of how these things are implemented
   in this routine:

   The primary control block in this scheme is the block_info block which serves as a cache record, change array anchor,
   gv_target, and so on. In short, everything that is known about a given database block is contained in this one
   structure. There is an array of these structures with the name "blk_set" which is a global variable array dimensioned
   at a thoroughly outrageous amount for the worst case scenario.

   There are areas within the blk_set array that are worth describing:

   - The block_depth global variable always holds the top in use index into blk_set.
   - blk_set[0] describes the block that was fed to us from the phase 1 scan. It is the primary block that needs to
     be split. If nothing needs to happen to it, we go to the next record and blk_set[0] get a new block in it.
   - Starting with blk_set[1] through blk_set[bottom_tree_index] are first the directory tree (DT) blocks and then
     (if primary was a GVT block) the global variable tree (GVT) blocks.
   - Starting with blk_set[bottom_tree_index + 1] through blk_set[bottom_created_index] are newly created blocks during
     split processing.
   - Starting with blk_set[bottom_created_index + 1] through blk_set[block_depth] are local bit map blocks that are being
     modified for the "transaction".

   This engine has a very simple cache mechanism. If a block we need is somewhere in the blk_set array (a global variable
   block_depth_hwm maintains a high water mark), the cache version is used rather than forcing a re-read from disk. It is
   fairly simple but seems to save a lot of reads, especially of the directory tree and the local bit_maps.

   Like gvcst_put(), once we have the blocks from the tree loaded, they are processed in reverse order as a split in one
   block requires a record to be inserted into the parent block. We start with the primary block (blk_set[0]) and then
   move to blk_set[bottom_tree_index] and work backwards from there until either we get to a block for which there are
   no updates or we hit a root block (GVT or DT depending) at which time we are done with the primary update loop.

   After performing the block splits and creating new blocks, we double check that we have room to hold them all. If not,
   we make a call to MUPIP EXTEND to extend the database for us. Since this means we have to close the file and give up
   our locks on it, we also restart the transaction and force all blocks to be re-read from disk.

   Once assured we have sufficient free blocks, we start at blk_set[bottom_created_index] and work down to
   blk_set[bottom_tree_index + 1] allocating and assigning block numbers to the created blocks. Part of this process also
   puts the block numbers into places where the update arrays will pick them up when the referencing blocks are built.

   Once all the new blocks have been assigned, we loop through blk_set[bottom_tree_index] to blk_set[0] and create the
   new versions of the blocks (for those blocks marked as being updated). A note here is that this engine does not build
   update array entries for bitmap blocks, preferring instead to just update the local bitmap block buffers directly.

   The last major loop is to write to disk all the new and changed blocks to disk. There is no processing but simple IO
   in this loop to minimize the potential of something going wrong. There is no recovery at this point. If this loop fails
   in mid-stream, the database is toast.

*/
boolean_t dbc_split_blk(phase_static_area *psa, block_id blk_num, enum gdsblk_type blk_type, v15_trans_num tn, int blk_levl)
{
	int		blk_len, blk_size, restart_cnt, save_block_depth;
	int		gvtblk_index, dtblk_index, blk_index, bottom_tree_index, bottom_created_index;
	int		curr_blk_len, curr_blk_levl, curr_rec_len, ins_key_len, ins_rec_len;
	int		curr_rec_shrink, curr_rec_offset, blks_this_lmap;
	int		prev_rec_offset, new_blk_len, new_rec_len, remain_offset, remain_len, blk_seg_cnt;
	int		new_lh_blk_len, new_rh_blk_len, created_blocks, extent_size;
	int		local_map_max, lbm_blk_index, lcl_blk, curr_rec_cmpc, cmpc;
	int4		lclmap_not_full;
	uint4		total_blks;
	boolean_t	dummy_bool;
	boolean_t	got_root, level_0, completed, insert_point, restart_transaction;
	blk_segment	*bs_ptr, *bs1, *blk_sega_p, *blk_array_top;
	rec_hdr_ptr_t	ins_rec_hdr, next_rec_hdr, new_star_hdr;
	dbc_gv_key	*last_rec_key;
	uchar_ptr_t	rec_p, next_rec_p, mid_point, cp1, lcl_map_p, new_blk_p, blk_p, blk_endp, chr_p;
	unsigned short	us_rec_len;
	v15_trans_num	curr_tn;
	block_id	blk_ptr;
	block_id	bitmap_blk_num, *lhs_block_id_p, *rhs_block_id_p, allocated_blk_num;
	block_info	*blk_set_p, *blk_set_new_p, *blk_set_prnt_p, *blk_set_bm_p, *blk_set_rhs_p;
	block_info	restart_blk_set;

	DEBUG_ONLY(
		boolean_t	first_time = FALSE;
	)

	/* First order of business is to read the required block in */
	psa->block_depth = -1;
	blk_size = psa->dbc_cs_data->blk_size;	/* BLK_FINI macro needs a local copy */
	dbc_read_dbblk(psa, blk_num, blk_type);

	/* Now that we have read the block in, let us see if it is still a "problem" block. If its
	   TN has changed, that is an indicator that is should NOT be a problem block any longer
	   with the sole exception of a TN RESET having been done on the DB since phase 1. In that
	   case, we will still insist on a phase 1 rerun as some of our sanity checks have disappeared.
	*/
	assert(0 == psa->block_depth);
	blk_p = psa->blk_set[0].old_buff;
	assert(blk_p);
	blk_len = psa->blk_set[0].blk_len;

	/* If the block is still too large, sanity check on TN at phase 1 and now. Note that it is
	   possible in an index block for the TN to have changed yet the block is otherwise unmodified
	   if (1) this is an index block and (2) a record is being inserted before the first record in
	   the block. In this case, the new record is put into the new (LH) sibling and the entire existing
	   block is put unmodified into the RH side in the existing block. The net result is that only
	   the TN changes in this block and if the block is too full it is not split. This will never
	   happen for a created block though. It can only hapen for existing index blocks. Note if the
	   block is not (still) too full that we cannot yet say this block has nothing to happen to it
	   because if it is a gvtroot block, we need to record its right side children further down.
	*/
	GET_ULONG(curr_tn, &((v15_blk_hdr_ptr_t)blk_p)->tn);
	if ((UNIX_ONLY(8) VMS_ONLY(9) > blk_size - blk_len) && (curr_tn != tn) && (gdsblk_gvtleaf == blk_type))
	{
		/* Block has been modified: Three possible reasons it is not fixed:
		   1) The user was playing with reserved bytes and set it too low allowing some
		   large blocks to be created we did not know about (but thankfully just caught).
		   2) User ran a recover after running phase 1 that re-introduced some too-large
		   blocks. This is a documented no-no but we have no way to enforce it on V4.
		   3) There was a TN reset done.
		   All three of these causes require a rerun of the scan phase.
		*/
		rts_error(VARLSTCNT(3) ERR_DBCMODBLK2BIG, 1, blk_num);
	}

	/* Isolate the full key in the first record of the block */
	dbc_init_key(psa, &psa->first_rec_key);
	dbc_find_key(psa, psa->first_rec_key, blk_p + sizeof(v15_blk_hdr), psa->blk_set[0].blk_levl);
	psa->first_rec_key->gvn_len = USTRLEN((char_ptr_t)psa->first_rec_key->base);	/* The GVN we need to lookup in the DT */

	/* Possibilities at this point:
	   1) We are looking for a DT (directory tree) block.
	   2) We are looking for a GVT (global variable tree) block.

	   We lookup first_rec_key in the directory tree. If (1) we pass the block level we are searching for
	   as a parameter. If (2), we pass -1 as the block level we are searching for as we need a complete
	   search of the leaf level DT in order to find the GVN.

	   If (1) then the lookup is complete and verification and (later) block splitting can begin. If (2), we need to
	   take the pointer from the found DT record which points to the GVT root block and start our search again
	   from there using the level from the original block as a stopping point. One special case here is if our
	   target block was a gvtroot block, we don't need to traverse the GVT tree to find it. We get it from the
	   directory tree and stop our search there.
	*/
	switch(blk_type)
	{
		case gdsblk_dtindex:
		case gdsblk_dtleaf:
		case gdsblk_dtroot:
			/* Since our search is to end in the dt tree, stop when we get to the requisite level */
			blk_index = dbc_find_dtblk(psa, psa->first_rec_key, blk_levl);
			if (0 > blk_index)
			{	/* Integrity error encountered or record not found. We cannot proceed */
				assert(FALSE);
				rts_error(VARLSTCNT(8) ERR_DBCINTEGERR, 2, RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn),
					  ERR_TEXT, 2,
					  RTS_ERROR_LITERAL("Unable to find index (DT) record for an existing global"));
			}
			break;
		case gdsblk_gvtindex:
		case gdsblk_gvtleaf:
			/* Search all the way down to lvl 0 to get a dtleaf block */
			dtblk_index = dbc_find_dtblk(psa, psa->first_rec_key, 0);
			if (0 > dtblk_index)
			{	/* Integrity error encountered or record not found. We cannot proceed */
				assert(FALSE);
				rts_error(VARLSTCNT(8) ERR_DBCINTEGERR, 2, RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn),
					  ERR_TEXT, 2, RTS_ERROR_LITERAL("Unable to locate DT leaf (root) block"));
			}
			assert(0 == ((v15_blk_hdr_ptr_t)psa->blk_set[dtblk_index].old_buff)->levl);
			/* Note level 0 directory blocks can have collation data in them but it would be AFTER
			   the block pointer which is the first thing in the record after the key.
			*/
			GET_ULONG(blk_ptr, (psa->blk_set[dtblk_index].curr_rec + sizeof(rec_hdr)
					    + psa->blk_set[dtblk_index].curr_blk_key->end + 1
					    - ((rec_hdr *)psa->blk_set[dtblk_index].curr_rec)->cmpc));
			gvtblk_index = dbc_read_dbblk(psa, blk_ptr, gdsblk_gvtroot);
			assert(-1 != gvtblk_index);
			/* If our target block was not the gvtroot block we just read in then we keep scanning for our
			   target record. Otherwise, the scan stops here.
			*/
			if (0 != gvtblk_index)
			{
				blk_index = dbc_find_record(psa, psa->first_rec_key, gvtblk_index, blk_levl, gdsblk_gvtroot, FALSE);
				if (0 > blk_index)
				{
					if (-1 == blk_index)
					{	/* Integrity error encountered. We cannot proceed */
						assert(FALSE);
						rts_error(VARLSTCNT(8) ERR_DBCINTEGERR, 2,
							  RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn),
							  ERR_TEXT, 2,
							  RTS_ERROR_LITERAL("Unable to find index record for an existing global"));
					} else if (-2 == blk_index)
					{	/* Record was not found. Record has been deleted since we last
						   found it. Elicits a warning message in DEBUG mode but is otherwise ignored.
						*/
						assert(FALSE);
						DBC_DEBUG(("DBC_DEBUG: Block split of blk 0x%x bypassed because its "
							   "key could not be located in the GVT\n", blk_num));
						psa->blks_bypassed++;
						psa->blks_read += psa->block_depth;
						/* Only way to properly update the count of cached records is to run the list
						   and check them.
						*/
						for (blk_index = psa->block_depth, blk_set_p = &psa->blk_set[blk_index];
						     0 <= blk_index;
						     --blk_index, --blk_set_p)
						{	/* Check each block we read */
							if (gdsblk_create != blk_set_p->usage && blk_set_p->found_in_cache)
								psa->blks_cached++;
						}
						return FALSE;	/* No restart necessary */
					} else
						GTMASSERT;
				}
			} else
			{	/* This is a gvtroot block and is the subject of our search */
				blk_index = gvtblk_index;
				assert(gdsblk_gvtroot == psa->blk_set[0].blk_type);
			}
			break;
		default:
			GTMASSERT;
	}
	/* The most recently accessed block (that terminated the search) should be the block
	   we are looking for (which should have been found in the cache as block 0. If not,
	   there is an integrity error and we should not continue.
	*/
	if (0 != blk_index)
	{	/* Integrity error encountered. We cannot proceed */
		assert(FALSE);
		rts_error(VARLSTCNT(8) ERR_DBCINTEGERR, 2, RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn),
			  ERR_TEXT, 2,
			  RTS_ERROR_LITERAL("Did not locate record in same block as we started searching for"));
	}

	/* If this is a gvtroot type block, we have some extra processing to do. Following is a description of
	   the issue we are addressing here. If a gvtroot block is "too large" and was too large at the time
	   the scan was run, it will of course be identified by the scan as too large. Prior to running the scan,
	   the reserved bytes field was set so no more too-full blocks can be created. But if a gvtroot block is
	   identified by the scan and subsequently has to be split by normal GTM processing before the certify
	   can be done, the too-full part of the block can (in totality) end up in the right hand child of the
	   gvtroot block (not obeying the reserved bytes rule). But the gvtroot block is the only one that was
	   identified by the scan and certify may now miss the too-full block in the right child. Theoretically,
	   the entire right child chain of the gvtroot block can be too full. Our purpose here is that when we
	   have identified a gvtblock as being too full, we pause here to read the right child chain coming off
	   of that block all the way down to (but not including) block level 0. Each of these blocks will be
	   processed to check for being too full. The way we do this is to run the chain and build p1rec entries
	   in the gvtroot_rchildren[] array. When we are at the top of the processing loop, we will take these
	   array entries over records from the phase one input file. We only load up the array if it is empty.
	   Otherwise, the assumption is that we are re-processing and the issue has already been handled.
	*/
	blk_set_p = &psa->blk_set[0];
	if (gdsblk_gvtroot == blk_set_p->blk_type && 0 == psa->gvtroot_rchildren_cnt)
	{
		DBC_DEBUG(("DBC_DEBUG: Encountered gvtroot block (block %d [0x%08x]), finding/queueing children\n",
			   blk_set_p->blk_num, blk_set_p->blk_num));
		save_block_depth = psa->block_depth;	/* These reads are temporary and should not remain in cache so
							   we will restore block_depth after we are done.
							*/
		/* Attempting to locate the maximum possible key for this database should read the list of right
		   children into the cache. Pretty much any returncode from dbc_find_record is possible. We usually
		   aren't going to find the global which may come up as not found or an integrity error or it could
		   possibly even be found. Just go with what it gives us. Not much verification we can do on it.
		*/
		blk_index = dbc_find_record(psa, psa->max_key, 0, 0, gdsblk_gvtroot, TRUE);
		/* Pull children (if any) out of cache and put into queue for later processing */
		for (blk_index = save_block_depth + 1;
		     blk_index <= psa->block_depth && gdsblk_gvtleaf != psa->blk_set[blk_index].blk_type;
		     ++blk_index, ++psa->gvtroot_rchildren_cnt)
		{	/* Fill in p1rec type entry in gvtroot_rchildren[] for later */
			DBC_DEBUG(("DBC_DEBUG: Right child block: blk_index: %d  blk_num: %d [0x%08x]  blk_levl: %d\n",
				   blk_index, psa->blk_set[blk_index].blk_num, psa->blk_set[blk_index].blk_num,
				   psa->blk_set[blk_index].blk_levl));
			psa->gvtroot_rchildren[psa->gvtroot_rchildren_cnt].tn = psa->blk_set[blk_index].tn;
			psa->gvtroot_rchildren[psa->gvtroot_rchildren_cnt].blk_num = psa->blk_set[blk_index].blk_num;
			psa->gvtroot_rchildren[psa->gvtroot_rchildren_cnt].blk_type = psa->blk_set[blk_index].blk_type;
			psa->gvtroot_rchildren[psa->gvtroot_rchildren_cnt].blk_levl = psa->blk_set[blk_index].blk_levl;
			psa->gvtroot_rchildren[psa->gvtroot_rchildren_cnt].akey_len = 0;
		}
		psa->block_depth = save_block_depth;
		blk_index = 0;	/* reset to start *our* work in the very first block */
	}

	/* Now we have done the gvtroot check if we were going to. If this particular block has sufficient room in it
	   we don't need to split it of course.
	*/
	if (UNIX_ONLY(8) VMS_ONLY(9) <= blk_size - blk_len)
	{	/* This block has room now - no longer need to split it */
		DBC_DEBUG(("DBC_DEBUG: Block not processed as it now has sufficient room\n"));
		psa->blks_bypassed++;
		psa->blks_read++;
		if (psa->blk_set[0].found_in_cache)
			psa->blks_cached++;
		return FALSE;	/* No restart needed */
	}

	/* Beginning of block update/split logic. We need to process the blocks in the reverse order from the
	   tree path. This means blk_set[0] which is actually the block we want to split must be the first
	   in our path. We then need to process the block array backwards in case the changes made to those
	   records cause subsequent splits.

	   First order of business is to find a suitable place to split this block .. Run through
	   the records in the block until we are "halfway" through the block. Split so that the first record
	   (after the first) whose end point is in the "second half" of the block will be the first record of
	   the second half or right hand side block after the split. This makes sure that the left side has at
	   least one record in it. We already know that this block has at least 2 records in it or it would not
	   need splitting.
	*/
	rec_p = blk_p + sizeof(v15_blk_hdr);
	blk_set_p->curr_rec = rec_p;
	dbc_find_key(psa, blk_set_p->curr_blk_key, rec_p, blk_set_p->blk_levl);
	GET_USHORT(us_rec_len, &((rec_hdr *)rec_p)->rsiz);
	curr_rec_len = us_rec_len;
	next_rec_p = rec_p + curr_rec_len;
	blk_set_p->curr_match = 0;		/* First record of block always cmpc 0 */
	blk_len = ((v15_blk_hdr_ptr_t)blk_p)->bsiz;
	blk_endp = blk_p + blk_len;
	mid_point = blk_p + blk_size / 2;
	do
	{	/* Keep scanning the next record until you find the split point which is the first record that straddles the
		 * mid-point of the block. This loop makes sure the prev_key and curr_key fields are correctly set when we
		 * enter the processing loop below.
		 */
		blk_set_p->prev_match = blk_set_p->curr_match;
		memcpy(blk_set_p->prev_blk_key, blk_set_p->curr_blk_key, (sizeof(dbc_gv_key) + blk_set_p->curr_blk_key->end));
		rec_p = next_rec_p;	/* Must be at least one record in LHS and one in RHS */
		blk_set_p->prev_rec = blk_set_p->curr_rec;
		blk_set_p->curr_rec = rec_p;
		GET_USHORT(us_rec_len, &((rec_hdr *)rec_p)->rsiz);
		curr_rec_len = us_rec_len;
		dbc_find_key(psa, blk_set_p->curr_blk_key, rec_p, blk_set_p->blk_levl);
		blk_set_p->curr_match = ((rec_hdr *)rec_p)->cmpc;
		next_rec_p = rec_p + curr_rec_len;
		if (next_rec_p >= blk_endp)	/* We have reached the last record in the block. Cannot skip anymore. */
			break;
		if (next_rec_p >= mid_point)
		{	/* The current record straddles the mid-point of the almost-full block. This is most likely going
			 * to be the split point. If splitting at the current record causes the RHS block to continue to
			 * be too-full and there is still room in the LHS block we will scan one more record in this loop.
			 * Scanning this one more record should make the RHS block no longer too-full. This is asserted below.
			 */
			/* Compute the sizes of the LHS and RHS blocks assuming the current record moves into each of them */
			if (blk_set_p->blk_levl)
			{	/* Index block. The current record is changed into a *-key (a simple star key rec) */
				new_lh_blk_len = (int)((rec_p - blk_p) + BSTAR_REC_SIZE);
			} else
			{	/* Data block. Always simple split (no inserted record) */
				new_lh_blk_len = (int)(next_rec_p - blk_p);
				assert(gdsblk_gvtleaf == blk_set_p->blk_type || gdsblk_dtleaf == blk_set_p->blk_type);
			}
			assert(0 < new_lh_blk_len);
			/* assert that the LHS block without the current record is guaranteed not to be too-full */
			assert((new_lh_blk_len - (next_rec_p - rec_p)) <= psa->max_blk_len);
			/* Right hand side has key of curr_rec expanded since is first key of blcok */
			new_rh_blk_len = (int)(sizeof(v15_blk_hdr) + blk_set_p->curr_match + blk_len - (rec_p - blk_p) );
			assert(0 < new_rh_blk_len);
			if ((new_rh_blk_len <= psa->max_blk_len) || (new_lh_blk_len > psa->max_blk_len))
				break;
			assert(FALSE == first_time);	/* assert we never scan more than one record past mid-point of the block */
			DEBUG_ONLY(first_time = TRUE;)
		}
	} while (TRUE);
	assert((rec_p - blk_p) < ((v15_blk_hdr_ptr_t)blk_p)->bsiz);

	/* Block processing loop */
	bottom_tree_index = psa->block_depth;	/* Record end of the tree in case need bit map blocks later */
	update_array_ptr = update_array;		/* Reset udpate array */
	DBC_DEBUG(("DBC_DEBUG: Beginning split processing loop\n"));
	for (completed = FALSE; !completed;)
	{	/* Backwards process until we hit a block with no changes to it */
		DBC_DEBUG(("DBC_DEBUG: ******** Top of blk process loop for block index %d\n", blk_index));
		assert(0 <= blk_index);
		blk_set_p = &psa->blk_set[blk_index];
		assert(blk_set_p->blk_len == ((v15_blk_hdr_ptr_t)blk_set_p->old_buff)->bsiz);
		assert(blk_set_p->blk_levl == ((v15_blk_hdr_ptr_t)blk_set_p->old_buff)->levl);
		curr_blk_len = blk_set_p->blk_len;
		curr_blk_levl = blk_set_p->blk_levl;
		if (0 != blk_set_p->ins_rec.ins_key->end)
		{
			ins_key_len = blk_set_p->ins_rec.ins_key->end + 1;
			ins_rec_len = ins_key_len + SIZEOF(block_id);	/* We only ever insert index records */
		} else
			ins_key_len = ins_rec_len = 0;
		blk_p = blk_set_p->old_buff;
		/* If ins_rec_len has a non-zero value, then we need to reset the values for prev_match and
		   key_match. These values were computed using the original scan key as their basis. Now we
		   are using these fields to insert a new key. The positioning is still correct but the
		   number of matching characters is potentially different.
		*/
		if (ins_rec_len)
		{
			if (0 != blk_set_p->prev_blk_key->end)
			{	/* There is a "previous record" */
				insert_point = dbc_match_key(blk_set_p->prev_blk_key, blk_set_p->blk_levl,
							     blk_set_p->ins_rec.ins_key, &blk_set_p->prev_match);
				assert(!insert_point);	/* This is prior to insert point (sanity check) */
			}
			insert_point = dbc_match_key(blk_set_p->curr_blk_key, blk_set_p->blk_levl,
						     blk_set_p->ins_rec.ins_key, &blk_set_p->curr_match);
			assert(insert_point);	/* This is supposed to *be* the insert point */
		}
		/* Make convenient copies of some commonly used record fields */
		curr_rec_cmpc = ((rec_hdr *)blk_set_p->curr_rec)->cmpc;
		curr_rec_shrink = blk_set_p->curr_match - curr_rec_cmpc;
		curr_rec_offset = (int)(blk_set_p->curr_rec - blk_set_p->old_buff);
		GET_USHORT(us_rec_len, &((rec_hdr *)blk_set_p->curr_rec)->rsiz);
		curr_rec_len = us_rec_len;
		prev_rec_offset = (int)(blk_set_p->prev_rec - blk_set_p->old_buff);
		got_root = (gdsblk_dtroot == blk_set_p->blk_type) || (gdsblk_gvtroot == blk_set_p->blk_type);
		/* Decide if this record insert (if an insert exists) will cause a block split or not. If this
		   is the first block in the tree (the one we got from the phase 1 file), there will be no insert.
		   If we find a block that does not need to change, we are done and can exit the loop.
		   This differs from the regular GT.M runtime which must keep checking even the split blocks
		   but since we never add data to a level 0 block being split, we will never create split-off
		   blocks that themselves are (still) too full.
		*/
		assert(gdsblk_read == blk_set_p->usage);
		new_blk_len = (int)(ins_rec_len ? (curr_blk_len + curr_rec_cmpc + sizeof(rec_hdr) + ins_rec_len
					      - blk_set_p->prev_match - blk_set_p->curr_match)
			       : curr_blk_len);		/* No inserted rec, size does not change */
		if (new_blk_len <= psa->max_blk_len)
		{	/* "Simple" case .. we do not need a block split - only (possibly) a record added. Note
			   that this is the only path where there may not be a "previous" record so we
			   have to watch for that possibility.
			*/
			assert(0 != blk_index);		/* Never insert a record into target blk so should never be here */
			/* In this path we should always have an inserted record length. We should have detected we
			   were done in an earlier loop iteration.
			*/
			assert(ins_rec_len);
			DBC_DEBUG(("DBC_DEBUG: Block index %d is a simple update\n", blk_index));
			/* We must have an insert at this point and since we only ever insert records into
			   index blocks, we must be in that situation */
			assert(0 != curr_blk_levl);
			blk_set_p->usage = gdsblk_update;	/* It's official .. blk is being modified */
			/* We have a record to insert into this block but no split is needed */
			BLK_INIT(bs_ptr, bs1);
			blk_set_p->upd_addr = bs1;		/* Save address of our update array */
			if (0 != blk_set_p->prev_blk_key->end)
			{	/* First piece is block prior to the record + key + value */
				BLK_SEG(bs_ptr,
					blk_set_p->old_buff + sizeof(v15_blk_hdr),
					(curr_rec_offset - sizeof(v15_blk_hdr)));
			}
			BLK_ADDR(ins_rec_hdr, sizeof(rec_hdr), rec_hdr);
			/* Setup new record header */
			new_rec_len = (int)(sizeof(rec_hdr) + ins_rec_len - blk_set_p->prev_match);
			ins_rec_hdr->rsiz = new_rec_len;
			ins_rec_hdr->cmpc = blk_set_p->prev_match;
			BLK_SEG(bs_ptr, (sm_uc_ptr_t)ins_rec_hdr, sizeof(rec_hdr));
			/* Setup key */
			BLK_ADDR(cp1,
				 blk_set_p->ins_rec.ins_key->end + 1 - blk_set_p->prev_match,
				 unsigned char);
			memcpy(cp1, blk_set_p->ins_rec.ins_key->base + blk_set_p->prev_match,
			       blk_set_p->ins_rec.ins_key->end + 1 - blk_set_p->prev_match);
			BLK_SEG(bs_ptr, cp1, blk_set_p->ins_rec.ins_key->end + 1 - blk_set_p->prev_match);
			/* Setup value (all index records have value of size "block_id". The proper value is
			   either there already or will be when we go to commit these changes. */
			BLK_SEG(bs_ptr, (sm_uc_ptr_t)&blk_set_p->ins_rec.blk_id, sizeof(block_id));
			/* For index blocks, we know that since a star key is the last record in the block
			   (which is the last record that can be curr_rec) that there is a trailing portion
			   of the block we need to output.
			*/
			BLK_ADDR(next_rec_hdr, sizeof(rec_hdr), rec_hdr);	/* Replacement rec header */
			next_rec_hdr->rsiz = curr_rec_len - curr_rec_shrink;
			next_rec_hdr->cmpc = blk_set_p->curr_match;
			BLK_SEG(bs_ptr, (sm_uc_ptr_t)next_rec_hdr, sizeof(rec_hdr));
			remain_offset = curr_rec_shrink + SIZEOF(rec_hdr);	/* Where rest of record plus any
										   further records begin */
			remain_len = curr_blk_len - curr_rec_offset;
			BLK_SEG(bs_ptr,
				blk_set_p->curr_rec + remain_offset,
				remain_len - remain_offset);
			if (0 == BLK_FINI(bs_ptr, bs1))
				GTMASSERT;
			assert(blk_seg_cnt == new_blk_len);
			DBC_DEBUG(("DBC_DEBUG: Stopping block scan after simple update (no further inserts to previous lvls)\n"));
			completed = TRUE;
			break;
		} else
		{	/* The block is either already too large or would be too large when the record is inserted
			   and so it must be split.

			   There are two different ways a block can be split. It can either be split so that:

			   (1) the inserted record is at the end of the left block or,

			   (2) the record is the first record in the right half.

			   Compute the left/right block sizes for these two options and see which one does not
			   force a secondary block split (one of them must be true here unlike in GT.M code because
			   here we are NEVER adding a record to a level 0 block, we only split lvl 0 blocks as
			   needed). Note that the case where we are splitting a level 0 block with no record insert
			   is treated as an unremarkable variant of option (1) as described above.

			   Follow the conventions of gvcst_put (LHS to new block, RHS to old block):

			   (1) If we are inserting the record into the lefthand side then a new split-off block will
			   receive the first part of the block including the record. The remainder of the block is
			   placed into the current (existing) block.

			   (2) If we are putting the record into the righthand side, then a new split-off block will
			   receive the first part of the block. The new record plus the remainder of the block is
			   placed into the current block.

			   The sole exception to the above is if a root block (either DT or GVT) is being split. In
			   that case, BOTH the LHS and RHS become NEW blocks and the root block is (a) increased in
			   level and (b) contains only entries for the two created blocks.

			   Note that gvcst_put has several additional checks and balances here that we are forgoing
			   such as making sure the blocks are as balanced as possible, concurrency concerns, etc. They
			   add un-needed complications to this one-time code. Any inefficiencies here can be undone
			   with a pass of MUPIP REORG.
			*/
			DBC_DEBUG(("DBC_DEBUG: Block index %d needs to be split\n", blk_index));
			/*  First up is split so that the inserted record (if any) is the last record in the left
			    hand block. Note if this is an index block, the last record must be a star key rec as per
			    option (1) above.
			*/
			if (curr_blk_levl)
				/* Index block. Two cases: (a) We are adding a key to the end in which case it is just
				   a simple star key rec or (b) No record is being added so the previous record is
				   changed into a star key rec.
				*/
				new_lh_blk_len = (int)(curr_rec_offset + BSTAR_REC_SIZE
					- (ins_rec_len ? 0 : (blk_set_p->curr_rec - blk_set_p->prev_rec)));
			else
			{
				/* Data block. Always simple split (no inserted record) */
				new_lh_blk_len = curr_rec_offset;
				assert(gdsblk_gvtleaf == blk_set_p->blk_type || gdsblk_dtleaf == blk_set_p->blk_type);
			}
			assert(0 < new_lh_blk_len);
			/* Right hand side has key of curr_rec expanded since is first key of blcok */
			new_rh_blk_len = (int)(sizeof(v15_blk_hdr) + ((rec_hdr *)blk_set_p->curr_rec)->cmpc +
					       (curr_blk_len - curr_rec_offset));
			assert(0 < new_rh_blk_len);
			/* Common initialization */
			++psa->block_depth;			/* Need a new block to split into */
			if (MAX_BLOCK_INFO_DEPTH <= psa->block_depth)
				GTMASSERT;
			DBC_DEBUG(("DBC_DEBUG: Block index %d used for newly created split (lhs) block\n", psa->block_depth));
			blk_set_new_p = &psa->blk_set[psa->block_depth];
			dbc_init_blk(psa, blk_set_new_p, -1, gdsblk_create, new_lh_blk_len, curr_blk_levl);
			if (got_root)
				/* If root, the LHS sub-block is a different type */
				blk_set_new_p->blk_type = (gdsblk_gvtroot == blk_set_p->blk_type)
					? gdsblk_gvtindex : gdsblk_dtindex;
			else
				blk_set_new_p->blk_type = blk_set_p->blk_type;
			/* Complete our LHS block */
			BLK_INIT(bs_ptr, bs1);		/* Our new block to create */
			blk_set_new_p->upd_addr = bs1;
			level_0 = (0 == curr_blk_levl);
			/* See if they fit in their respective blocks */
			if (level_0 || (new_lh_blk_len <= psa->max_blk_len) && (new_rh_blk_len <= psa->max_blk_len))
			{	/* Method 1 - record goes to left-hand side */
				DBC_DEBUG(("DBC_DEBUG: Method 1 block lengths: lh: %d  rh: %d  max_blk_len: %d\n", \
					   new_lh_blk_len, new_rh_blk_len, psa->max_blk_len));
				/* New update array for new block */
				if (level_0)
				{	/* Level 0 block, we are only splitting it -- never adding a record */
					assert(curr_rec_offset <= psa->max_blk_len);
					BLK_SEG(bs_ptr, blk_set_p->old_buff + sizeof(v15_blk_hdr),
						curr_rec_offset - sizeof(v15_blk_hdr));
					assert(0 == ins_rec_len);	/* Never insert records to lvl0 */
					if (new_rh_blk_len > psa->max_blk_len)
					{	/* Case of a data block that has a DBCREC2BIG error unnoticed by DBCERTIFY SCAN.
						 * Should not happen normally. But in case it does in production, we will handle
						 * it by NOT certifying the database and requiring a rerun of the SCAN
						 */
						assert(FALSE);
						gtm_putmsg(VARLSTCNT(6) ERR_DBCREC2BIGINBLK, 4,
							   blk_num, psa->dbc_cs_data->max_rec_size,
							   psa->dbc_gv_cur_region->dyn.addr->fname_len,
							   psa->dbc_gv_cur_region->dyn.addr->fname);
						psa->blk_process_errors++;	/* must be zero to certify db at end */
					}
				} else
				{	/* Index block -- may or may not be adding a record.
					   If adding a record, the inserted record becomes a star key record.
					   If not adding a record the last record is morphed into a star key record.
					*/
					BLK_SEG(bs_ptr, blk_set_p->old_buff + sizeof(v15_blk_hdr),
						(ins_rec_len ? curr_rec_offset : prev_rec_offset)
						- sizeof(v15_blk_hdr));
					BLK_ADDR(new_star_hdr, sizeof(rec_hdr), rec_hdr);
					new_star_hdr->rsiz = BSTAR_REC_SIZE;
					new_star_hdr->cmpc = 0;
					BLK_SEG(bs_ptr, (uchar_ptr_t)new_star_hdr, sizeof(rec_hdr));
					BLK_SEG(bs_ptr, (ins_rec_len ? (uchar_ptr_t)&blk_set_p->ins_rec.blk_id
							 : (blk_set_p->prev_rec + sizeof(rec_hdr)
							    + blk_set_p->prev_blk_key->end + 1
							    - ((rec_hdr *)blk_set_p->prev_rec)->cmpc)),
						sizeof(block_id));
				}
				/* Complete our LHS block */
				if (0 == BLK_FINI(bs_ptr, bs1))
					GTMASSERT;
				assert(blk_seg_cnt == new_lh_blk_len);
				/* Remember key of last record in this block */
				if (0 == ins_rec_len)
					last_rec_key = blk_set_p->prev_blk_key;
				else
					last_rec_key = blk_set_p->ins_rec.ins_key;
				if (!got_root)
				{	/* New block created, insert record to it in parent block. To do this we create
					   a record with the last key in this LH block to be inserted between curr_rec
					   and prev_rec of the parent block.
					*/
					if (0 == blk_index)
						blk_set_prnt_p = &psa->blk_set[bottom_tree_index]; /* Cycle back up to parent */
					else
						blk_set_prnt_p = blk_set_p - 1;
					assert(blk_set_prnt_p != &psa->blk_set[0]);
					assert(NULL != last_rec_key);
					/* Note: We do not need the "+ 1" on the key length since sizeof(dbc_gv_key) contains
					   the first character of the key so the "+ 1" to get the last byte of the key is
					   already integrated into the length
					*/
					memcpy(blk_set_prnt_p->ins_rec.ins_key, last_rec_key,
					       sizeof(dbc_gv_key) + last_rec_key->end);
					/* Setup so that creation of the blk_set_new_p block can then set its block id into
					   our parent block's insert rec buffer which will be made part of the inserted
					   record at block build time
					*/
					blk_set_new_p->ins_blk_id_p = &blk_set_prnt_p->ins_rec.blk_id;
					blk_set_rhs_p = blk_set_p;	/* Use original block for rhs */
					blk_set_rhs_p->usage = gdsblk_update;
				} else
				{	/* Have root block: need to put the RHS into a new block too */
					DBC_DEBUG(("DBC_DEBUG: Splitting root block, extra block to be created\n"));
					++psa->block_depth;			/* Need a new block to split into */
					if (MAX_BLOCK_INFO_DEPTH <= psa->block_depth)
						GTMASSERT;
					blk_set_rhs_p = &psa->blk_set[psa->block_depth];
					dbc_init_blk(psa, blk_set_rhs_p, -1, gdsblk_create, new_rh_blk_len, curr_blk_levl);
					/* We will put the pointers to both this block and the RHS we build next
					   into the original root block -- done later when RHS is complete */
					/* If root, the RHS sub-block is a different type */
					blk_set_rhs_p->blk_type = (gdsblk_gvtroot == blk_set_p->blk_type)
						? gdsblk_gvtindex : gdsblk_dtindex;
				}

				/**** Now build RHS into either current or new block ****/
				BLK_INIT(bs_ptr, bs1);
				blk_set_rhs_p->upd_addr = bs1;		/* Block building roadmap.. */
				BLK_ADDR(next_rec_hdr, sizeof(rec_hdr), rec_hdr);
				next_rec_hdr->rsiz = curr_rec_len + curr_rec_cmpc;
				next_rec_hdr->cmpc = 0;
				BLK_SEG(bs_ptr, (uchar_ptr_t)next_rec_hdr, sizeof(rec_hdr));
				/* Copy the previously compressed part of the key out of curr_rec. Note, if this
				   key is a star rec key, nothing is written because cmpc is zero */
				if (curr_rec_cmpc)
				{
					BLK_ADDR(cp1, curr_rec_cmpc, unsigned char);
					memcpy(cp1, blk_set_p->curr_blk_key->base, curr_rec_cmpc);
					BLK_SEG(bs_ptr, cp1, curr_rec_cmpc);
				}
				/* Remainder of existing block */
				BLK_SEG(bs_ptr,
					blk_set_p->curr_rec + sizeof(rec_hdr),
					curr_blk_len - curr_rec_offset - sizeof(rec_hdr));
				/* Complete update array */
				if (0 == BLK_FINI(bs_ptr, bs1))
					GTMASSERT;
				assert(blk_seg_cnt == new_rh_blk_len);
			} else
			{	/* Recompute sizes for inserted record being in righthand block as per
				   method (2) */
				DBC_DEBUG(("DBC_DEBUG: Method 1 created invalid blocks: lh: %d  rh: %d  " \
					   "max_blk_len: %d -- trying method 2\n", new_lh_blk_len, new_rh_blk_len, \
					   psa->max_blk_len));
				/* By definition we *must* have an inserted record in this path */
				assert(0 != ins_rec_len);
				/* New block sizes - note because we *must* be inserting a record in this method,
				   the only case considered here is when we are operating on an index block.
				*/
				assert(!level_0);
				/* Last record turns into star key record */
				new_lh_blk_len = (int)(curr_rec_offset + BSTAR_REC_SIZE -
						       (blk_set_p->curr_rec - blk_set_p->prev_rec) );
				assert(0 < new_lh_blk_len);
				new_rh_blk_len = (int)(sizeof(v15_blk_hdr) + sizeof(rec_hdr) +
						       ins_rec_len  + curr_blk_len - (curr_rec_offset) - curr_rec_shrink);
				assert(0 < new_rh_blk_len);
				if (new_lh_blk_len > psa->max_blk_len || new_rh_blk_len > psa->max_blk_len)
				{	/* This is possible if we are inserting a record into a block (and thus we are
					   not picking the insertion point) and the insertion point is either the first or
					   next-to-last record in the block such that neither method 1 nor 2 can create blocks
					   of acceptable size. In this case, although this problem block is likely on the
					   list of blocks to process, we cannot wait and thus must perform the split now.
					   To do that, we call this same routine recursively with the necessary parms to
					   process *THIS* block. Since this will destroy all the structures we had built
					   up, signal a transaction restart which will re-read everything and should allow
					   the transaction we were processing to proceed.
					*/
					if (curr_blk_len <= psa->max_blk_len)
						/* Well, that wasn't the problem, something else is wrong */
						rts_error(VARLSTCNT(8) ERR_DBCINTEGERR, 2,
							  RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn),
							  ERR_TEXT, 2, RTS_ERROR_LITERAL("Unable to split block appropriately"));
					/* If we do have to restart, we won't be able to reinvoke dbc_split_blk() with the
					   parms taken from the current blk_set_p as that array will be overwritten by the
					   recursion. Save the current blk_set_p so we can use it in a restartable context.
					*/
					restart_blk_set = *blk_set_p;
					for (restart_cnt = 0, restart_transaction = TRUE;
					     restart_transaction;
					     ++restart_cnt)
					{
						if (MAX_RESTART_CNT < restart_cnt)
							GTMASSERT;		/* No idea what could cause this */
						DBC_DEBUG(("DBC_DEBUG: *** *** Recursive call to handle too large block 0x%x\n", \
							   restart_blk_set.blk_num));
						psa->block_depth_hwm = -1;	/* Zaps cache so all blocks are re-read */
						restart_transaction = dbc_split_blk(psa, restart_blk_set.blk_num,
										    restart_blk_set.blk_type, restart_blk_set.tn,
										    restart_blk_set.blk_levl);
					}
					return TRUE;	/* This transaction must restart */
				}
				DBC_DEBUG(("DBC_DEBUG: Method 2 block lengths: lh: %d  rh: %d  max_blk_len: %d\n", \
					   new_lh_blk_len, new_rh_blk_len, psa->max_blk_len));

				/* Start building (new) LHS block - for this index record, the record before the split
				   becomes a new *-key.

				   Note:  If the block split was caused by our appending the new record
				   to the end of the block, this code causes the record PRIOR to the
				   current *-key to become the new *-key.
				*/
				BLK_SEG(bs_ptr,
					blk_set_p->old_buff + sizeof(v15_blk_hdr),
					prev_rec_offset - sizeof(v15_blk_hdr));
				/* Replace last record with star key rec */
				BLK_ADDR(new_star_hdr, sizeof(rec_hdr), rec_hdr);
				new_star_hdr->rsiz = BSTAR_REC_SIZE;
				new_star_hdr->cmpc = 0;
				BLK_SEG(bs_ptr, (uchar_ptr_t)new_star_hdr, sizeof(rec_hdr));
				/* Output pointer from prev_rec as star key record's value */
				BLK_SEG(bs_ptr,	blk_set_p->curr_rec - sizeof(block_id), sizeof(block_id));
				/* Complete our LHS block */
				if (0 == BLK_FINI(bs_ptr, bs1))
					GTMASSERT;
				assert(blk_seg_cnt == new_lh_blk_len);
				if (!got_root)
				{
					/* New block created, insert record to it in parent block. To do this we create
					   a record with the last key in this LH block to be inserted between curr_rec
					   and prev_rec of the parent block.
					*/
					if (0 == blk_index)
						blk_set_prnt_p = &psa->blk_set[bottom_tree_index]; /* Cycle back up to parent */
					else
						blk_set_prnt_p = blk_set_p - 1;
					assert(blk_set_prnt_p != &psa->blk_set[0]);
					assert(NULL != blk_set_p->prev_blk_key);
					/* Note: We do not need the "+ 1" on the key length since sizeof(dbc_gv_key) contains
					   the first character of the key so the "+ 1" to get the last byte of the key is
					   already integrated into the length
					*/
					memcpy(blk_set_prnt_p->ins_rec.ins_key, blk_set_p->prev_blk_key,
					       sizeof(dbc_gv_key) + blk_set_p->prev_blk_key->end);
					/* Setup so that creation of the blk_set_new_p block can then set its block id into
					   our parent block's insert rec buffer which will be made part of the inserted
					   record at block build time
					*/
					blk_set_new_p->ins_blk_id_p = &blk_set_prnt_p->ins_rec.blk_id;
					blk_set_rhs_p = blk_set_p;	/* Use original block for rhs */
					blk_set_rhs_p->usage = gdsblk_update;
				} else
				{	/* Have root block: need to put the RHS into a new block too */
					DBC_DEBUG(("DBC_DEBUG: Splitting root block, extra block to be created\n"));
					++psa->block_depth;			/* Need a new block to split into */
					if (MAX_BLOCK_INFO_DEPTH <= psa->block_depth)
						GTMASSERT;
					blk_set_rhs_p = &psa->blk_set[psa->block_depth];
					/* Key for last record in the LHS block used to (re)construct root block */
					last_rec_key = blk_set_p->curr_blk_key;
					dbc_init_blk(psa, blk_set_rhs_p, -1, gdsblk_create, new_rh_blk_len, curr_blk_levl);
					/* We will put the pointers to both this block and the RHS we build next
					   into the original root block -- done later when RHS is complete */
					/* If root, the RHS sub-block is a different type */
					blk_set_rhs_p->blk_type = (gdsblk_gvtroot == blk_set_p->blk_type)
						? gdsblk_gvtindex : gdsblk_dtindex;
				}

				/**** Now build RHS into current block ****/
				BLK_INIT(bs_ptr, bs1);
				blk_set_rhs_p->upd_addr = bs1;		/* Block building roadmap.. */
				/* Build record header for inserted record. Inserted record is always for index
				   type blocks
				*/
				BLK_ADDR(ins_rec_hdr, sizeof(rec_hdr), rec_hdr);
				ins_rec_hdr->rsiz = sizeof(rec_hdr) + blk_set_p->ins_rec.ins_key->end + 1
					+ sizeof(block_id);
				ins_rec_hdr->cmpc = 0;
				BLK_SEG(bs_ptr, (uchar_ptr_t)ins_rec_hdr, sizeof(rec_hdr));
				/* Now for the inserted record key */
				BLK_SEG(bs_ptr,
					blk_set_p->ins_rec.ins_key->base,
					blk_set_p->ins_rec.ins_key->end + 1);
				/* Finally the inserted record value always comes from the block_id field. It is
				   not filled in now but will be when the block it refers to is created. */
				BLK_SEG(bs_ptr,	(uchar_ptr_t)&blk_set_p->ins_rec.blk_id, sizeof(block_id));
				/* Record that was first in RH side now needs its cmpc (and length) reset since
				   it is now the second record in the new block. */
				BLK_ADDR(next_rec_hdr, sizeof(rec_hdr), rec_hdr);
				next_rec_hdr->rsiz = curr_rec_len - curr_rec_shrink;
				next_rec_hdr->cmpc = blk_set_p->curr_match;
				BLK_SEG(bs_ptr, (uchar_ptr_t)next_rec_hdr, sizeof(rec_hdr));
				remain_offset = curr_rec_shrink + SIZEOF(rec_hdr);	/* Where rest of record plus any
											   further records begin */
				remain_len = curr_blk_len - curr_rec_offset;
				BLK_SEG(bs_ptr,
					blk_set_p->curr_rec + remain_offset,
					remain_len - remain_offset);
				if (0 == BLK_FINI(bs_ptr, bs1))
					GTMASSERT;
				assert(blk_seg_cnt == new_rh_blk_len);
			} /* else method (2) */
			if (got_root)
			{	/* If we have split a root block, we need to now set the pointers to the new LHS
				   and RHS blocks into the root block as the only records. Note this requires a
				   level increase of the tree. Hopefully we will not come across a database that is
				   already at maximum level. If so, the only way to reduce the level is to run
				   MUPIP REORG with a fairly recent vintage of GT.M
				*/
				BLK_INIT(bs_ptr, bs1);
				blk_set_p->usage = gdsblk_update;	/* It's official .. blk is being modified */
				blk_set_p->upd_addr = bs1;		/* Block building roadmap.. */
				blk_set_p->blk_levl++;			/* Needs to be at a new level */
				if (MAX_BT_DEPTH <= blk_set_p->blk_levl)
					/* Tree is too high */
					GTMASSERT;
				/* First record will have last key in LHS block */
				BLK_ADDR(next_rec_hdr, sizeof(rec_hdr), rec_hdr);
				next_rec_hdr->rsiz = sizeof(rec_hdr) + last_rec_key->end + 1 + sizeof(block_id);;
				next_rec_hdr->cmpc = 0;
				BLK_SEG(bs_ptr, (uchar_ptr_t)next_rec_hdr, sizeof(rec_hdr));
				BLK_SEG(bs_ptr, last_rec_key->base, (last_rec_key->end + 1));
				BLK_ADDR(lhs_block_id_p, sizeof(block_id), block_id);	/* First record's value */
				BLK_SEG(bs_ptr, (uchar_ptr_t)lhs_block_id_p, sizeof(block_id));
				blk_set_new_p->ins_blk_id_p = lhs_block_id_p;	/* Receives block id when created */
				/* Second record is a star key record pointing to the RHS block */
				BLK_ADDR(new_star_hdr, sizeof(rec_hdr), rec_hdr);
				new_star_hdr->rsiz = BSTAR_REC_SIZE;
				new_star_hdr->cmpc = 0;
				BLK_SEG(bs_ptr, (uchar_ptr_t)new_star_hdr, sizeof(rec_hdr));
				BLK_ADDR(rhs_block_id_p, sizeof(block_id), block_id);	/* First record's value */
				BLK_SEG(bs_ptr, (uchar_ptr_t)rhs_block_id_p, sizeof(block_id));
				blk_set_rhs_p->ins_blk_id_p = rhs_block_id_p;	/* Receives block id when created */
				/* Complete update array */
				if (0 == BLK_FINI(bs_ptr, bs1))
					GTMASSERT;
				/* The root block is the last one we need to change */
				DBC_DEBUG(("DBC_DEBUG: Stopping block scan as blk_index %d is a root block\n", blk_index));
				completed = TRUE;
				break;
			}
		} /* else need block split */
		if (0 != blk_index)
			blk_index--;	/* working backwards.. */
		else
			blk_index = bottom_tree_index;
	} /* while !completed */
	assert(completed);

	/* Check that we have sufficient free blocks to create the blccks we need (if any) */
	created_blocks = psa->block_depth - bottom_tree_index;
	if (created_blocks > psa->dbc_cs_data->trans_hist.free_blocks)
	{	/* We have a slight problem in that this transaction requires more free blocks than are
		   available. Our recourse is to flush the current file-header preserving any changes we
		   have already made, close the file and execute a mupip command to perform an extension before
		   re-opening the db for further processing.
		*/
		DBC_DEBUG(("DBC_DEBUG: Insufficient free blocks for this transaction - calling MUPIP EXTEND\n"));
		dbc_flush_fhead(psa);
		dbc_close_db(psa);
		extent_size = MAX(psa->dbc_cs_data->extension_size, created_blocks);
		/* Now build command file to perform the MUPIP EXTEND for the region */
		dbc_open_command_file(psa);
		dbc_write_command_file(psa, MUPIP_START);
		strcpy((char_ptr_t)psa->util_cmd_buff, MUPIP_EXTEND);
		strcat((char_ptr_t)psa->util_cmd_buff, (char_ptr_t)psa->ofhdr.regname);
		strcat((char_ptr_t)psa->util_cmd_buff, " "OPTDELIM"B=");
		chr_p = psa->util_cmd_buff + strlen((char_ptr_t)psa->util_cmd_buff);
		chr_p = i2asc(chr_p, extent_size);
		*chr_p = 0;
		dbc_write_command_file(psa, (char_ptr_t)psa->util_cmd_buff);
		UNIX_ONLY(dbc_write_command_file(psa, "EOF"));
		dbc_close_command_file(psa);
		dbc_run_command_file(psa, "MUPIP", (char_ptr_t)psa->util_cmd_buff, FALSE);
		/* Seeing as how it is very difficult to (in portable code) get a coherent error code back from
		   an executed command, we will just assume it worked, open the database back in and see if in
		   fact it did actually extend sufficiently. If not, this is a perm error and we stop here.
		*/
		dbc_init_db(psa);
		if (created_blocks > psa->dbc_cs_data->trans_hist.free_blocks)
			rts_error(VARLSTCNT(4) ERR_DBCNOEXTND, 2, RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn));
		/* Database is now extended -- safest bet is to restart this particular update so that it is certain
		   nothing else got in besides the extention.
		*/
		DBC_DEBUG(("DBC_DEBUG: Restarting processing of this p1rec due to DB extension\n"));
		return TRUE;

	}
	/* The update arrarys are complete, we know there are sufficient free blocks in the database to accomodate
	   the splitting we have to do.
	*/
	bottom_created_index = psa->block_depth;	/* From here on out are bit map blocks */
	if (0 != created_blocks)
	{	/* Run through the created blocks assigning block numbers and filling the numbers into the buffers
		   that need them. If we didn't create any blocks, we know we didn't split any and there is nothing
		   to do for this p1 record.
		*/
		total_blks = psa->dbc_cs_data->trans_hist.total_blks;
		local_map_max = DIVIDE_ROUND_UP(total_blks, psa->dbc_cs_data->bplmap);
		DBC_DEBUG(("DBC_DEBUG: Assigning block numbers to created DB blocks\n"));
		for (blk_index = psa->block_depth, blk_set_p = &psa->blk_set[blk_index];
		     bottom_tree_index < blk_index; --blk_index, --blk_set_p)
		{
			assert(gdsblk_create == blk_set_p->usage);
			assert(NULL != blk_set_p->upd_addr);
			/* Find and allocate a database block for this created block */
			assert(NULL != blk_set_p->ins_blk_id_p);	/* Must be a place to put the block id */
			/* First find local bit map with some room in it */
			lclmap_not_full = bmm_find_free(psa->hint_blk / psa->dbc_cs_data->bplmap,
							(sm_uc_ptr_t)psa->dbc_cs_data->master_map,
							local_map_max);
			if (NO_FREE_SPACE == lclmap_not_full)
			{
				assert(FALSE);
				rts_error(VARLSTCNT(5) ERR_DBCINTEGERR, 2, RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn),
					  ERR_BITMAPSBAD);
			}
			if (ROUND_DOWN2(psa->hint_blk, psa->dbc_cs_data->bplmap) != lclmap_not_full)
				psa->hint_lcl = 1;
			bitmap_blk_num = lclmap_not_full * psa->dbc_cs_data->bplmap;
			/* Read this bitmap in. Note it may already exist in the cache (likely for multiple creates) */
			lbm_blk_index = dbc_read_dbblk(psa, bitmap_blk_num, gdsblk_bitmap);
			blk_set_bm_p = &psa->blk_set[lbm_blk_index];
			assert(IS_BML(blk_set_bm_p->old_buff));	/* Verify we have a bit map block */
			assert(ROUND_DOWN2(blk_set_bm_p->blk_num, psa->dbc_cs_data->bplmap) == blk_set_bm_p->blk_num);
			if (ROUND_DOWN2(psa->dbc_cs_data->trans_hist.total_blks, psa->dbc_cs_data->bplmap) == bitmap_blk_num)
				/* This bitmap is the last one .. compute total blks in partial this bitmap */
				blks_this_lmap = (psa->dbc_cs_data->trans_hist.total_blks - bitmap_blk_num);
			else
				/* Regular bitmap (not last one) */
				blks_this_lmap = psa->dbc_cs_data->bplmap;
			lcl_map_p = blk_set_bm_p->old_buff + sizeof(v15_blk_hdr);
			lcl_blk = psa->hint_lcl = bm_find_blk(psa->hint_lcl, lcl_map_p, blks_this_lmap, &dummy_bool);
			if (NO_FREE_SPACE == lcl_blk)
			{
				assert(FALSE);
				rts_error(VARLSTCNT(5) ERR_DBCINTEGERR, 2, RTS_ERROR_STRING((char_ptr_t)psa->ofhdr.dbfn),
					  ERR_BITMAPSBAD);
			}
			/* Found a free block, mark it busy. Note that bitmap blocks are treated somewhat differently
			   than other blocks. We do not create an update array for them but just change the copy in
			   old_buff as appropriate.
			*/
			bml_busy(lcl_blk, lcl_map_p);
			blk_set_bm_p->usage = gdsblk_update;
			/* See if entire block is full - if yes, we need to mark master map too */
			psa->hint_lcl = bml_find_free(psa->hint_lcl, lcl_map_p, blks_this_lmap);
			if (NO_FREE_SPACE == psa->hint_lcl)
			{	/* Local map was filled .. clear appropriate master map bit */
				DBC_DEBUG(("DBC_DEBUG: -- Local map now full - marking master map\n"));
				bit_clear(bitmap_blk_num / psa->dbc_cs_data->bplmap, psa->dbc_cs_data->master_map);
			}
			assert(lcl_blk);	/* Shouldn't be zero as that is for the lcl bitmap itself */
			allocated_blk_num = psa->hint_blk = bitmap_blk_num + lcl_blk;
			DBC_DEBUG(("DBC_DEBUG: -- The newly allocated block for block index %d is 0x%x\n", \
				   blk_index, allocated_blk_num));
			/* Fill this block number in the places it needs to be filled */
			assert(-1 == blk_set_p->blk_num);
			blk_set_p->blk_num = allocated_blk_num;
			*blk_set_p->ins_blk_id_p = allocated_blk_num;
			psa->dbc_cs_data->trans_hist.free_blocks--;	/* There is one fewer free blocks tonite */
			assert(0 <= (int)psa->dbc_cs_data->trans_hist.free_blocks);
			psa->dbc_fhdr_dirty = TRUE;
		}
		/* Now that all the block insertions have been filled in, run the entire chain looking for
		   both created and updated blocks. Build the new versions of their blocks in new_buff.
		*/
		DBC_DEBUG(("DBC_DEBUG: Create new and changed blocks via their update arrays\n"));
		for (blk_index = psa->block_depth, blk_set_p = &psa->blk_set[blk_index]; 0 <= blk_index; --blk_index, --blk_set_p)
		{	/* Run through the update array for this block */
			blk_sega_p = (blk_segment *)blk_set_p->upd_addr;
			if (gdsblk_bitmap == blk_set_p->blk_type || gdsblk_read == blk_set_p->usage)
			{	/* Bitmap blocks are updated in place and of course read blocks have no updates */
				DBC_DEBUG(("DBC_DEBUG: -- Block index %d bypassed for type (%d) or usage (%d)\n", \
					   blk_index,  blk_set_p->blk_type, blk_set_p->usage));
				assert(NULL == blk_sega_p);
				continue;
			}
			DBC_DEBUG(("DBC_DEBUG: -- Block index %d being (re)built\n", blk_index));
			assert(NULL != blk_sega_p);
			new_blk_len = INTCAST(blk_sega_p->len);
			new_blk_p = blk_set_p->new_buff;
			((v15_blk_hdr_ptr_t)new_blk_p)->bsiz = blk_set_p->blk_len = new_blk_len;
			((v15_blk_hdr_ptr_t)new_blk_p)->levl = blk_set_p->blk_levl;
			/* VMS has an unalighed tn. All UNIX variants have an aligned TN */
			VMS_ONLY(PUT_ULONG(&((v15_blk_hdr_ptr_t)new_blk_p)->tn, psa->dbc_cs_data->trans_hist.curr_tn));
			UNIX_ONLY(((v15_blk_hdr_ptr_t)new_blk_p)->tn = psa->dbc_cs_data->trans_hist.curr_tn);
			new_blk_p += sizeof(v15_blk_hdr);
			for (blk_array_top = (blk_segment *)blk_sega_p->addr, blk_sega_p++;
			     blk_sega_p <= blk_array_top; blk_sega_p++)
			{	/* Start with first subtantive array entry ([1]) and do the segment move thing */
				memcpy(new_blk_p, blk_sega_p->addr, blk_sega_p->len);
				new_blk_p += blk_sega_p->len;
			}
			assert((new_blk_p - blk_set_p->new_buff) == new_blk_len);
		}
		/* One last pass through the block list to do the physical IO on the database */
		psa->fc->op = FC_WRITE;
		psa->fc->op_len = blk_size;		/* Just write the full block out regardless */
		DBC_DEBUG(("DBC_DEBUG: Flush all modified blocks out to disk for this transaction\n"));
		psa->dbc_critical = TRUE;
		for (blk_index = psa->block_depth, blk_set_p = &psa->blk_set[blk_index]; 0 <= blk_index; --blk_index, --blk_set_p)
		{	/* Output all modified/created blocks */
			if (gdsblk_create != blk_set_p->usage)
			{	/* We read everything but created blocks and some of them were found in cache */
				psa->blks_read++;
				if (blk_set_p->found_in_cache)
					psa->blks_cached++;
			}
			if (gdsblk_read == blk_set_p->usage)
			{
				DBC_DEBUG(("DBC_DEBUG: -- Block index %d bypassed for usage (read)\n", blk_index));
				continue;	/* Nothing to do for read-only block */
			}
			if (gdsblk_bitmap == blk_set_p->blk_type)
			{	/* Bitmap blocks are built in old_buff, swap with new_buff. This also lets the
				   buffer be reused correctly (by dbc_read_dbblk) if we read this block into
				   the same place later.
				*/
				blk_p = blk_set_p->new_buff;
				blk_set_p->new_buff = blk_set_p->old_buff;
				blk_set_p->old_buff = blk_p;
			}
			DBC_DEBUG(("DBC_DEBUG: -- Block index %d being written as block 0x%x\n", blk_index, \
				   blk_set_p->blk_num));
			psa->fc->op_buff = blk_set_p->new_buff;
			psa->fc->op_pos = psa->dbc_cs_data->start_vbn + (blk_size / DISK_BLOCK_SIZE) * blk_set_p->blk_num;
			dbcertify_dbfilop(psa);
			if (gdsblk_create == blk_set_p->usage)
				psa->blks_created++;
			else
				psa->blks_updated++;
		}
		psa->dbc_critical = FALSE;
		if (forced_exit)
		{	/* Our exit was deferred until we cleared the critical section area */
			UNIX_ONLY(dbcertify_deferred_signal_handler());
			VMS_ONLY(sys$exit(exi_condition));
		}

		/* Update the transaction number in the fileheader for the next transaction */
		psa->dbc_cs_data->trans_hist.curr_tn++;
		psa->dbc_fhdr_dirty = TRUE;
	} else
		GTMASSERT;	/* If we got this far we should have split a block which would create a block */
	DBC_DEBUG(("DBC_DEBUG: Block processing completed\n"));
	psa->blks_processed++;

	return FALSE;	/* No transaction restart necessary */
}

/* Flush the file-header back out to the database */
void dbc_flush_fhead(phase_static_area *psa)
{
	if (!psa->dbc_fhdr_dirty)
		return;		/* Nothing to do if it wasn't dirtied */
	psa->fc->op = FC_WRITE;
	psa->fc->op_buff = (uchar_ptr_t)psa->dbc_cs_data;
	psa->fc->op_pos = 1;
	psa->fc->op_len = sizeof(v15_sgmnt_data);
	dbcertify_dbfilop(psa);
	psa->dbc_fhdr_dirty = FALSE;
	return;
}

/* Read the next output record into the buffer provided */
void dbc_read_p1out(phase_static_area *psa, void *obuf, int olen)
{
	int		rc, save_errno;
	char_ptr_t	errmsg;

	DOREADRC(psa->outfd, obuf, olen, rc);
	if (-1 == rc)
	{
		save_errno = errno;
		errmsg = STRERROR(save_errno);
		assert(FALSE);
		rts_error(VARLSTCNT(11) ERR_SYSCALL, 5, RTS_ERROR_LITERAL("read()"), CALLFROM,
			  ERR_TEXT, 2, RTS_ERROR_STRING(errmsg));
	}
}

/* Exit/cleanup routine */
void dbc_certify_phase_cleanup(void)
{
	phase_static_area	*psa;

	psa = psa_gbl;
	if (psa->dbc_gv_cur_region && psa->dbc_gv_cur_region->dyn.addr && psa->dbc_gv_cur_region->dyn.addr->file_cntl)
	{
		dbc_flush_fhead(psa);
		dbc_close_db(psa);
		if (psa->dbc_critical)
			rts_error(VARLSTCNT(4) ERR_TEXT,
				  2, RTS_ERROR_LITERAL("Failure while in critical section -- database damage likely"));
	}
	UNIX_ONLY(dbc_release_standalone_access(psa));
	if (psa_gbl->tmp_file_names_gend)
	{	/* Only close/delete if we know what they are */
		if (psa->tcfp)
			dbc_close_command_file(psa);
		if (!psa->keep_temp_files)
			dbc_remove_command_file(psa);
		if (psa->trfp)
			dbc_close_result_file(psa);
		if (!psa->keep_temp_files)
			dbc_remove_result_file(psa);
	}
}
